package com.rivues.task.resource;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.FileUtils;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rivues.core.RivuDataContext;
import com.rivues.module.platform.web.model.AnalyzerReport;
import com.rivues.module.platform.web.model.AnalyzerReportModel;
import com.rivues.module.platform.web.model.Database;
import com.rivues.module.platform.web.model.JobDetail;
import com.rivues.util.RivuTools;
import com.rivues.util.iface.report.R3Request;
import com.rivues.util.serialize.JSON;
import com.rivues.util.service.ServiceHelper;
import com.rivues.util.tools.ExportFile;
import com.rivues.util.tools.RivuCSVUtil;
import com.rivues.util.tools.RivuExcelUtil;
import com.rivues.util.tools.RivuPDFUtil;
import com.rivues.util.tools.RivuXMLUtil;

public class ExportReportResource extends Resource {
	private final Logger log = LoggerFactory.getLogger(ExportReportResource.class); 
	private JobDetail job ;
	private AnalyzerReport report = null ;
	private AnalyzerReportModel reportModel = null ;
	private FileOutputStream outputFile;
	private boolean export = false ;
	private String dsType = null ;
	private Database database  = null ;
	
	static final Executor fastExecutor = new ThreadPoolExecutor(
	          0,
	          Integer.MAX_VALUE,
	          10, TimeUnit.SECONDS, // terminate idle threads after 10 sec
	          new SynchronousQueue<Runnable>()  // directly hand off tasks
	          , new DefaultSolrThreadFactory("facetExecutor")
	  );
	
	
	/**
	 * 构造器
	 * @param job
	 */
	public ExportReportResource(JobDetail job){
		this.job = job ;
		report = JSON.parseObject(job.getTaskinfo(), AnalyzerReport.class) ;
		
    	if(report.getModel().size()>0){
    		for(AnalyzerReportModel model : report.getModel()){
    			if(model.getId().equals(job.getDataid())){
    				reportModel = model ;
    				break;
    			}
    		}
    	}
    	if(reportModel!=null){
    		ExportFile exportFile = null ;
			try {
				dsType = ServiceHelper.getReportService().getDsType(new R3Request() , this.reportModel, this.report, true , null , false , false) ;
				database = ServiceHelper.getReportService().getDatabaseType(new R3Request() , this.reportModel, this.report, true , null , false , false) ;
				outputFile = new FileOutputStream(new File(RivuTools.getDefaultExportFilePath(job.getOrgi()), job.getDicid()));
				if(job.getMemo()!=null && job.getMemo().equals("pdf")){
					exportFile = new RivuPDFUtil(reportModel, report , outputFile);
					
				}
				if(job.getMemo()!=null && job.getMemo().equals("excel")){
					exportFile = new RivuExcelUtil(reportModel, report , outputFile);
				}
				if(job.getMemo()!=null && job.getMemo().equals("csv")){
					exportFile = new RivuCSVUtil(reportModel, report , outputFile);
				}
				if(job.getMemo()!=null && job.getMemo().equals("xml")){
					exportFile = new RivuXMLUtil(reportModel, report , outputFile);
				}
	    		reportModel.setExport(exportFile) ;
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
    	}
	}
	
	@Override
	public void begin() throws Exception {
		try {
			if(dsType!=null){
				reportModel.getExport().setJobDetail(job);
				/**
				 * 如果类型为 Cube，或者 类型为 Hive类型时 ， 使用单线程导出
				 */
				if(job.getMemo()!=null && job.getMemo().equals("xml")||job.getMemo()!=null && job.getMemo().equals("pdf")||dsType.equals(RivuDataContext.CubeEnum.CUBE.toString())){
					reportModel.getExport().setReportData(ServiceHelper.getReportService().service(new R3Request() , this.reportModel, this.report, true , null , false , false) );
					reportModel.getExport().createFile(!RivuDataContext.DataBaseTYPEEnum.HIVE.toString().equals(reportModel.getDbType()));
				}else{
					String fetcherSize = null ;
					String sNum = "1" ;//	信号量
					if(job.getMemo()!=null && job.getMemo().equals("csv")){
						fetcherSize = RivuTools.getReportConfigureParam("system.config.report.CsvExportPagesize", reportModel.getReportid(), reportModel.getOrgi()) ;
						sNum = RivuTools.getReportConfigureParam("system.config.report.CsvExportSemaphore", reportModel.getReportid(), reportModel.getOrgi()) ;
						if(sNum == null || !sNum.matches("[\\d]{1,}")){
							sNum = "1" ;
						}
					}else if(job.getMemo()!=null && job.getMemo().equals("excel")){
						fetcherSize = RivuTools.getReportConfigureParam("system.config.report.ExcelExportPagesize", reportModel.getReportid(), reportModel.getOrgi()) ;
						sNum = RivuTools.getReportConfigureParam("system.config.report.ExcelExportSemaphore", reportModel.getReportid(), reportModel.getOrgi()) ;
						if(sNum == null || !sNum.matches("[\\d]{1,}")){
							sNum = "1" ;
						}
					}
					/**
					 * 获取总页数，然后开启信号量，采集全部数据  , 如果是 HIVE库，则使用单线程模式导出， 导出分页的最大尺寸为 Integer.MAX_VALUE
					 */
					boolean notsupportpage = database!=null && RivuDataContext.DataBaseTYPEEnum.HIVE.toString().equalsIgnoreCase(database.getSqldialect()) ;
					final int fetchs =  notsupportpage ? Integer.MAX_VALUE :  fetcherSize!=null && fetcherSize.matches("[\\d]{1,}")?Integer.parseInt(fetcherSize):10000 ;
					boolean hasNextPage = true ;
					
					if(notsupportpage){
						sNum = "1" ;
					}
					
					final Semaphore semaphore = new Semaphore(Integer.parseInt(sNum));
					final AtomicInteger fetch = new AtomicInteger() ;	//用于计数的轮次，从0开始
					while(hasNextPage && job.isFetcher()){
						if(this.job.getUsearea()!=null && this.job.getUsearea().matches("[\\d]{1,}")){
							long total = Long.parseLong(this.job.getUsearea()) ;
							if(fetch.intValue() <= (total / fetchs)){
								/**
								 * 开启型号两
								 */
								List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
								for(int i = fetch.intValue() ; i<= (total / fetchs) ; i++){
									
									Callable<Integer> callable = new Callable<Integer>() {
										@Override
										public Integer call() throws Exception {
											try {
												RivuTools.getRSReportData(report, null, reportModel, fetch.incrementAndGet(), fetchs ,reportModel.getExport());
											} catch (Exception e) {
											} finally {
												semaphore.release();
											}
											return new Integer(0);
										}
									};
				
									RunnableFuture<Integer> runnableFuture = new FutureTask<Integer>(callable);
									
									try {
										semaphore.acquire();
									} catch (InterruptedException e) {
										e.printStackTrace();
									}
									
									fastExecutor.execute(runnableFuture);// releases semaphore
									futures.add(runnableFuture);
								}
								
								for (Future<Integer> future : futures) {
									future.get() ;
								}
								assert semaphore.availablePermits() >= 10;
							}
							hasNextPage = false ;	//无需再次执行
						}else{
							RivuTools.getRSReportData(report, null, reportModel, 1, fetchs ,reportModel.getExport());
							hasNextPage = true ;	//无分页数据，需要先执行一次，然后获取到 分页数据后进入循环
							fetch.incrementAndGet() ;
						}
					}
				}
				
    		}
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			export = true ;
			Map<String, Object> jobDetailMap = RivuDataContext.getClusterInstance().get(RivuDataContext.DistributeEventEnum.RUNNINGJOB.toString());
			File file = new File(RivuTools.getDefaultExportFilePath(job.getOrgi()), job.getDicid());
			if(!((JobDetail)(jobDetailMap.get(job.getId()))).isFetcher()){
				new File(RivuTools.getDefaultExportFilePath(job.getOrgi()), job.getDicid()).delete();
			}else{
				
				/**
				 * 将JobDetail也保存下来，用于后台做列表管理功能
				 */
				File jobFilePath = RivuTools.getDefaultExportJobPath(job.getOrgi()) ;
				if(!jobFilePath.exists()){
					jobFilePath.mkdirs() ;
				}
				FileUtils.writeByteArrayToFile(new File(jobFilePath , job.getDicid()+".job") , RivuTools.toBytes(job));
			}
			reportModel.getExport().close();//把数据写出
			try {
				outputFile.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	@Override
	public void end(boolean clear) throws Exception {
		this.job.getReport().setPages(0) ;
	}

	@Override
	public JobDetail getJob() {
		return job;
	}

	@Override
	public void process(OutputTextFormat meta, JobDetail job) {
		
	}

	@Override
	public OutputTextFormat next() throws Exception {
		OutputTextFormat outTextFormat = null ;
		if(export == false){
			outTextFormat = new OutputTextFormat(job) ;
		}
		return outTextFormat;
	}

	@Override
	public boolean isAvailable() {
		// TODO Auto-generated method stub
		return true;
	}

	@Override
	public OutputTextFormat getText(OutputTextFormat object) throws Exception {
		return object;
	}

	@Override
	public void rmResource() {
		
	}

	@Override
	public void updateTask() {
		try {
			this.outputFile.close();
			
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}


}
